Skip to content

DO NOT MERGE#8

Closed
attilapiros wants to merge 1 commit intomasterfrom
SPARK-51272_attila_2
Closed

DO NOT MERGE#8
attilapiros wants to merge 1 commit intomasterfrom
SPARK-51272_attila_2

Conversation

@attilapiros
Copy link
Owner

No description provided.

@github-actions github-actions bot added the CORE label Apr 8, 2025
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have two questions:

  1. The window. which I was talking about is this, and I think it is present in the above code.
    a) The first result task failed, due to which the event loop thread submitted a ResubmitFailedStage message .
    b) But before this message was put in the queue, a new nessage of successful result task arrived.
    So now the sequence of messages is : ResultTask ( successful) , followed by the ResubmitStageMessage
    The event loop thread picks up result task successful and now there is a an output available.
    The event loop thread now picks up ResubmitFailedStage task and sees stage.findMissingPartitions().length != rs.partitions.length
    and proceeds to abort stage ( which I believe is that query will be aborted .... right?).

  2. why are you making a new stage Attempt, because there is already a committed result ( isnt it?). And as it cannot be reverted, query needs to abort ?
    ( I may be wrong in my understanding of the code of dagScheduler and stages, so pls bear with me).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros : also I am wondering if you had a clean build? Coz I think in my chnages when I did something like this, I saw a valid inDeterminate stage which had some partitions missing , but that code path was not from a failed task..it was a proper path...
so with this change, that path would also encounter abort.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was 1 failure in Streaming and one in SQL but unrelated.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.) There is no window. b) is tested actually tested by your createDagInterceptorForSpark51272, isn't it?
2.) That was a mistake

@attilapiros attilapiros force-pushed the SPARK-51272_attila_2 branch from 2b3fb52 to 0967f90 Compare April 8, 2025 22:10
@ahshahid
Copy link

ahshahid commented Apr 8, 2025 via email

@attilapiros
Copy link
Owner Author

I am thinking about refactoring the tests.

@attilapiros
Copy link
Owner Author

Actually we should abort even if the first task was failed with fetch failed and have any completion with success following (so ignore won't work).

The problem is not the output registration here but the temporary files which was written by the task body / or writing to any external database. When we get the Success event those files are already generated at the executor side.

@ahshahid
Copy link

ahshahid commented Apr 9, 2025 via email

@ahshahid
Copy link

ahshahid commented Apr 9, 2025 via email

@attilapiros
Copy link
Owner Author

attilapiros commented Apr 9, 2025

Let's assume we have a job which writes to a table stored for example on HDFS. Let's say the indeterministic resultStage contains 2 tasks. One fetching from hostA, the other from hostB. Fetching fails from hostA so we do our part and post the resubmit failed stages. But the other task is still running as the fetch from hostB was successful. It even finishes successfully (we do not know when this happens as it is on the executor side). So its result will be committed with the Hadoop's FileOutputCommitter (where a task can either be aborted or committed but I do not think you can commit and change your mind latter to abort) and I do not think there is any guarantee what will happen if you recommit that Hadoop task again. So now as we are in an indeterministic stage and we know what is committed is outdated our
only option is abort the Hadoop job. But that's outside of the spark job:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala#L109

@ahshahid
Copy link

ahshahid commented Apr 9, 2025 via email

@attilapiros
Copy link
Owner Author

The FileOutputCommitter is invoked from the driver side ( right ?)

From the driver and the executor:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala#L97

The task commit on the executor side as I have described above.

@ahshahid
Copy link

ahshahid commented Apr 9, 2025 via email

@ahshahid
Copy link

ahshahid commented Apr 9, 2025 via email

@attilapiros
Copy link
Owner Author

attilapiros commented Apr 10, 2025

FileOtputCommitter is an interface and it has different implementations. One successful test would not be sufficient here. So I was thinking how to prove this without investing a ton of extra times and I would choose another example:
writing the result via JDBC to an external DB.

Here you can see it iterates over on the partitions and calls INSERT INTOs:

repartitionedDF.foreachPartition { iterator => savePartition(
table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options)
}

where one INSERT INTO is SQL dialect specific:

val insertStmt = getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)

So if there is one fetch for a host failed and another succeeded but you ignore the bookkeeping on the driver side and rerun all the task (as you treat all of them as missing one after the failure) you will duplicate the rows.

@ahshahid
Copy link

ahshahid commented Apr 10, 2025 via email

@attilapiros
Copy link
Owner Author

I will go through the example, but logically speaking the executors cannot
independently commit the final results, that commit needs to be done by a
coordinator which I believe is the driver.

There are two levels:

  • task level (executors)
  • job level (driver)

The driver does the job level commit as we have seen in the OutputCommitter case outside of the spark job.

Just think about inserting into external DB. We are distributed so a session transaction is off the table.

Granted one successful test is not enough, but then there is no test
whatsoever to prove otherwise, atleast as of now.
If a test can come through , I do believe its a time worth the effort, as
it is related to data integrity.

OK I started to run your test SparkHASuite to show you even with the output committers we have a problem. But it IMHO it is not testing the case with fetch failures. At least I cannot see any: "with indeterminate output was failed" (which is an info level log):

logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " +
log"we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")

@ahshahid
Copy link

I will run the the HA test again today to check the logging which you are looking for..

I have not looked into committing portion of the spark code on executors, but it would seem odd to me, if the executor's task level commit , did not accompany the attempt number ( which I do believe seeing getting passed to executor as part of task request). So if the commits ( when going to say a DB) are being written against an attempt ID, the conflict or duplicate rows should not arise..
I will try to find what information goes as part of committing the executor's task results...

@attilapiros
Copy link
Owner Author

attilapiros commented Apr 11, 2025

Please add a log to DAGScheduler#handleTaskCompletion to:

 event.reason match {
        case Success =>
          if (!isIndeterministicZombie) {
            task match {
              case rt: ResultTask[_, _] =>
                val resultStage = stage.asInstanceOf[ResultStage]
                resultStage.activeJob match {
                  case Some(job) =>
                    // Only update the accumulator once for each result task.
                    if (!job.finished(rt.outputId)) {
                      updateAccumulators(event)
                    }
                  case _ => // Ignore update if task's job has finished.
                }
              case _ => updateAccumulators(event)
            }
+         } else {
+           logError("IndeterministicZombie detected")
          }

And preferably your test should contain this line.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants